package day03.sink;

import beans.SensorReading;
import day02.transform.FlinkTransform00;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

/**
 * Flink 流处理 API - sink
 * <p>
 * 将数据输出到 redis 中
 *
 * @author lvbingbing
 * @date 2021-12-20 08:18
 */
public class FlinkSink01 {
    public static void main(String[] args) throws Exception {
        // 1、创建FlinkTransform00对象，有参构造会初始化 env，并从文件中读取数据
        int parallelism = 1;
        FlinkTransform00 flinkTransform = new FlinkTransform00(parallelism);
        // 2、获取执行环境
        StreamExecutionEnvironment env = flinkTransform.getEnv();
        // 3、学习 将数据输出到 redis 中
        studyWriteToRedis(flinkTransform.getSensorReadingStream());
        // 4、触发程序执行
        env.execute();
    }

    /**
     * 学习 将数据输出到 redis 中
     *
     * @param sensorReadingStream <br>
     */
    private static void studyWriteToRedis(DataStream<SensorReading> sensorReadingStream) {
        FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
                .setHost("hadoop102")
                .build();
        sensorReadingStream.addSink(new RedisSink<>(config, new MyRedisMapper()));
    }

    /**
     * 定义一个 redis 的 mapper 类，用于定义保存到 redis 时调用的命令
     */
    static class MyRedisMapper implements RedisMapper<SensorReading> {

        /**
         * 定义保存数据到 redis 的命令
         *
         * @return 命令行类型的描述
         */
        @Override
        public RedisCommandDescription getCommandDescription() {
            // 存成 Hash表
            return new RedisCommandDescription(RedisCommand.HSET, "sensor_temp");
        }

        @Override
        public String getKeyFromData(SensorReading data) {
            return data.getId();
        }

        @Override
        public String getValueFromData(SensorReading data) {
            return data.getTemperature().toString();
        }
    }
}
